package rabbitmq

import (
	"errors"
	"github.com/astaxie/beego/logs"
	"github.com/streadway/amqp"
	"log"
)

// IConsumer 消费者接口
type IConsumer interface {
	Run() error
}

// NewConsumer 创建一个新的消费者
func NewConsumer(connectionName string, queueName string) *Consumer {
	consumer := &Consumer{
		connectionName: connectionName,
		queueName:      queueName,
	}
	return consumer
}

// Consumer 消费者结构体
type Consumer struct {
	connectionName string                //链接名称
	queueName      string                //队列名称
	messageHandler func(x amqp.Delivery) //处理消息的方法
	errHandler     func(err error) bool  //处理错误的方法，当错误处理方法返回true时，进程退出
	ch             *amqp.Channel
	conn           *amqp.Connection
}

// Connect 建立连接并初始化信道
func (c *Consumer) Connect() (err error) {
	// 建立连接
	c.conn, err = NewConnection(c.connectionName)
	if err != nil {
		return
	}
	// 获取channel
	c.ch, err = c.conn.Channel()
	if err != nil {
		return
	}
	return
}

func (c *Consumer) Close() error {
	return c.conn.Close()
}

func (c *Consumer) Get() (data []byte, err error) {
	msg, ok, err := c.ch.Get(c.queueName, true)
	if err != nil {
		return
	}
	if ok {
		return msg.Body, nil
	}
	return
}

// Qos 流量控制
func (c *Consumer) Qos(prefetchCount, prefetchSize int) *Consumer {
	err := c.ch.Qos(
		prefetchCount, // prefetch count
		prefetchSize,  // prefetch size
		false,         // global
	)
	if err != nil {
		logs.Warn("[rabbitmq] setting qos failed")
	}
	return c
}

// Run 启动消费者
func (c *Consumer) Run() (err error) {

	if c.conn == nil {
		return errors.New("call connect to initialization connection")
	}

	defer func() {
		c.ch.Close()
		c.conn.Close()
	}()

	if c.messageHandler == nil || c.errHandler == nil {
		return errors.New("you have not specified the handler")
	}

	closeChan := make(chan *amqp.Error, 1)
	notifyClose := c.ch.NotifyClose(closeChan)
	msgs, err := c.ch.Consume(
		c.queueName, // queue
		"",          // consumer
		false,       // auto-ack
		false,       // exclusive
		false,       // no-local
		false,       // no-wait
		nil,         // args
	)
	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	for {
		select {
		case e := <-notifyClose:
			logs.Error("error: %s", e.Error())
			result := c.errHandler(e)
			//if e.Code == 406 {
			//	// 重连，代码有问题，先注释
			//	err = c.Connect()
			//	if err != nil {
			//		logs.Error("reconnect rabbitmq failed: ", err)
			//		return
			//	}
			//	continue
			//}
			if result {
				return
			}
		case x := <-msgs:
			//logs.Info("received a message: %s", x.Body)
			c.messageHandler(x)
			//return
		}
	}
}

// SetMessageHandler ...
func (c *Consumer) SetMessageHandler(handler func(x amqp.Delivery)) *Consumer {
	c.messageHandler = handler
	return c
}

// SetErrorHandler 接收到错误的消息
func (c *Consumer) SetErrorHandler(handler func(err error) bool) *Consumer {
	c.errHandler = handler
	return c
}
